package penging.rocketmq;


import com.alibaba.rocketmq.client.exception.MQClientException;
import com.alibaba.rocketmq.client.producer.DefaultMQProducer;
import com.alibaba.rocketmq.client.producer.SendResult;
import com.alibaba.rocketmq.common.message.Message;
import com.penging.infra.util.ApplicationUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import java.util.ArrayList;
import java.util.List;

/**
 * 生产者，初始化MQProducer
 */

public class Producer implements DatabaseInterface{

    private Logger logger= LoggerFactory.getLogger(Producer.class);

    private List<DefaultMQProducer> producerList = new ArrayList<DefaultMQProducer>();




    /**
     * 启动RocketMq生产者
     */
    public DefaultMQProducer startProducer(String groupName) throws MQClientException {
        logger.info("启动RocketMq生产者..{}" + groupName);
        String nameSrvAddr = ApplicationUtil.getApplicationContext().getEnvironment().getProperty(SYSTEM_NAMESRV_ADDR);
        DefaultMQProducer producer = new DefaultMQProducer(groupName);
        producer.setNamesrvAddr(nameSrvAddr);
        try {
            producer.start();
        } catch (Exception e) {
            logger.error("启动RocketMq生产者异常",e);
            throw e;
        }
        logger.info("启动RocketMq生产者成功..{}"+groupName);
        return producer;
    }



    /**
     * 发送消息
     * @param groupName 消息组
     * @param key 业务id
     * @param body 消息内容
     * @return
     */
    public SendResult sendMsg(String groupName ,String key ,String body) throws Exception {

        if(null == groupName || groupName.isEmpty() || null == body || body.isEmpty()){
            return null;
        }

        Message msg = new Message(groupName,
                "push",
                key,
                body.getBytes());

        int isStart = 0;//标示消息组是否已经启动
        try {
            if(producerList.isEmpty()){
                //消息组未启动
                DefaultMQProducer mqProducer = startProducer(groupName);
                producerList.add(mqProducer);
                return mqProducer.send(msg);
            }else {
                for(DefaultMQProducer p : producerList){
                    //如果找到匹配的组，则发消息
                    if(groupName.equals(p.getProducerGroup())){
                        return p.send(msg);
                    }else {
                        isStart=1;//消息组未启动
                    }
                }
            }

            if(isStart == 1){
                //消息组未启动
                DefaultMQProducer mqProducer = startProducer(groupName);
                producerList.add(mqProducer);
                return mqProducer.send(msg);
            }

        }catch (Exception e ){
            logger.error("发送消息异常",e);
            throw e;
        }

        return null;

    }

}